CloseHooks机制
CloseHooks
是Vert.x中一项内部使用的特性,在Vertx实例或者Verticle关闭时,CloseHooks
能实现自动清理(比如HTTP Server,Net Server…)的功能。
我们了解下CloseHooks
实现的大概流程。
在我们的Verticle
或者Vertx
实例部署好以后,我们通常会使用vertx.createHttpServer()
去部署一些HTTP Server或者Net Server,Net Client等等。这些都是可以在关联的Verticle或者Vertx实例关闭之后实现自动清理的资源。
下面以在Verticle上的Net Server为例
①通过vertx.createNetServer()
创建一个NetServer,返回一个NetServerImpl
实例
②在NetServerImpl
构造器中,会将当前Verticle
所在Context
hook上这个NetServer。
|
|
NetServerImpl
实现了Closeable
接口,Closeable
接口只有一个close
方法。
Context
的addCloseHook()
方法就是将实现了Closeable
接口的资源添加到自己内部类CloseHooks
中closeHooks
的这个Set
中。
③Verticle
在unDeploy()
以后,会将自己持有的Context对象的CloseHooks
对象执行run()
方法,就是对每一个Set中Closeable
对象调用close()
方法,这样就能实现自动清理所有hook上的资源的功能。
这种机制其实是一种观察者模式的实现,Verticle
的Context
或者Vertx
实例是被观察者,可以添加观察者Closeable
,在订阅的关系形成之后,一旦Verticle
或者Vertx
关闭,那么就会通知观察者(可以关闭的资源)去执行它们自己的close()
方法,这样就可以实现自动清理的机制。
周期任务与延时任务机制
Vert.x提供了一些API用来执行周期任务或者延时任务,在Vertx
接口中定义好了以下API
我们可以使用这些API非常简单的执行一些延时任务、周期任务或者取消任务的操作。
实现
setTimer()与setPeriodic()
在VertxImpl
类中找到setTimer()
与setPeriodic()
方法的实现。
这两个方法都是调用scheduleTimeout()
方法,接着看这个方法的实现
这个方法返回的是我们创建的任务的timerId
,这个id可以用在cancelTimer()
方法取消该任务。ID是通过原子long类型变量timeoutCounter
生成,每次新增一个任务时,这个原子变量就会取值并且自增1。然后创建一个InternalTimerHandler
对象,我们马上会介绍这个类。id与task对象组成的kv会被添加到timeouts
这个Map中,并且context会增加一个属于task的CloseHook。
InternalTimerHandler
InternalTimerHandler
是VertxImpl
的内部类,实现了Handler
与Closeable
接口。我们可以看下这个类的结构
handle()
由于实现了Handler
,那么就会有handle()
方法,我们看handle()
代码
这段代码不复杂,就是检查当前任务是否被取消,如果没有取消,就去执行持有的handler的逻辑,这个handler对象是在构造器传入的,通过this.handler = runnable;
将我们最开始在调用API时写的handler保存到这个对象的handler对象中,handler执行完以后如果不是周期任务就执行cleanupNonPeriodic
清理。
Constructor
我们注意到之前传递给InternalTimerHandler
构造器参数包括了任务ID,任务要执行的Handler,是否为周期任务的布尔值,执行间隔(延时),以及调用的context。我们直接看构造器中代码
通过调用的context获取关联的EventLoop
(Netty),将context.runOnContext(this)
这个方法封装成一个Runnable对象,this
指的就是自己InternalTimerHandler
对象,因为这个对象也是一个Handler
对象,然后将Runnable对象提交给EventLoop
的任务队列处理,如果是周期任务就使用scheduleAtFixedRate()
提交,延时任务就使用schedule()
提交。这些任务提交后的返回值是一个ScheduledFuture
对象,保存在future
对象中。
cancel()
cancel()
方法就是通过刚才保存的future
对象来控制取消,可以调用Future
的cancel()
方法来取消任务。
cancelTimer
cancelTimer(long id)
这个API就是委托InternalTimerHandler
对象的cancel()
方法完成取消任务。
timerStream与periodicStream
除了直接创建任务之外,还可以使用提供的API创建一个延时流或者周期流。注意这个流只能执行一个你指定的任务(延时任务或者周期任务),但是你可以对流进行一些常用的操作。
开启TimeoutStream
的代码,返回的是一个TimeoutStreamImpl
对象。
TimeoutStreamImpl
类实现了TimeoutStream
接口与Handler<Long>
接口。`
TimeoutStream
继承了ReadStream
接口,提供一些常用的流操作的接口
handle()
我们看handle()
方法,如果流没有停止,那么就会使用该对象持有的handler去执行逻辑,如果是延时任务并且注册了endHandler
,那么执行完handler后还会执行endHandler。
handler()
handler()
方法为这个TimeoutStream
注册一个handler,当触发了时间事件时(延时或者周期),这个注册的handler就会执行。如果注册的handler是null,那么就取消这个任务。
看实现就知道一个TimeoutStream
是不可以注册多个任务的,会抛出IllegalStateException
。设置任务的工作还是交给了scheduleTimeout()
这个方法去处理。和之前有点不同的是传入参数handler是TimeoutStreamImpl
对象,通过多态机制TimeoutStreamImpl
在这里扮演了handler
的角色。
TimeoutStreamImpl
上面写了一些注释
This class is optimised for performance when used on the same event loop that is was passed to the handler with. However it can be used safely from other threads. The internal state is protected using the synchronized keyword. If always used on the same event loop, then we benefit from biased locking which makes the overhead of synchronized near zero.
尽管大量使用了synchronized
关键字,但由于JDK1.6后偏向锁的优势,如果总是在一个EventLoop线程上,而该EventLoop线程如果没有被其他线程抢占锁,这时偏向锁是偏向模式,那么这个线程再进行请求锁时,就不用再一次进行同步操作,这样开销会降低从而避免了性能由于重量级锁急剧下降,同时偏向锁也能保证程序的线程安全执行。
Netty任务队列机制
上面说到了我们的延时任务或者周期任务都是通过提交给EventLoop
实现的,下面以NioEventLoop
为例介绍Netty中的任务队列的机制。
我们先大致看一下NioEventLoop
的结构
可以看到层次非常多,我们针对任务队列自底向上进行分析。
execute()执行任务
execute()
方法之前在vert.x context接触过,就是将一个任务封装成Runnable对象提交到任务队列。execute()
方法在Executor
接口中定义,我们在SingleThreadEventExecutor
找到它的实现如下
|
|
实际上execute()
方法只是执行了addTask(task)
添加任务。
添加任务代码如下
我们的任务都被通过offer()
操作提交到队列taskQueue
中了,而这个任务队列的定义是private final Queue<Runnable> taskQueue;
,任务队列维护着一组Runnable对象。
而在一次NioEventLoop循环中,即无限循环run()
方法逻辑中,调用runAllTasks()
方法去执行任务队列中的任务,而这些任务就是通过pollTaskFrom(taskQueue)
从任务队列中取出Runnable对象,最后调用safeExecute(task);
直接run()
执行。这些从任务队列取出任务去执行的逻辑即runAllTasks()
都是在SingleThreadEventExecutor
类中实现的。
schedule调度执行任务
在EventLoop
中,由于NioEventLoop
实现了ScheduledExecutorService
接口,因此有该接口的调度任务的方法。
这几个调度执行任务的方法都是可以在EventLoop
上调用的。Schedule具体的实现是在AbstractScheduledEventExecutor
中完成的。下面我们以Vert.x中的周期任务与延时任务为例分析scheduleAtFixedRate()
与schedule()
的实现。
添加任务
我们关注上面调度方法①与④的实现部分,在AbstractScheduledEventExecutor
中找到对应实现。
方法①的返回值如下
|
|
方法④的返回值如下
|
|
它们都返回了一个接受参数ScheduledFutureTask
的重载方法,我们先看重载方法的实现
|
|
这个重载方法就是将刚才传入的ScheduledFutureTask
对象加入到调度任务队列scheduledTaskQueue
中,并且返回这个任务。注意到Netty中很多方法都是命令与查询不分离的,即方法不仅仅是返回结果,还有对结果的操作,这个重载方法就干了命令与查询两件事。
ScheduledFutureTask
接下来我们看一看刚才加入到调度任务队列中的ScheduledFutureTask
是干什么用的。
我们先从构造器开始,先看周期任务创建的ScheduledFutureTask
调用的构造器
首先会接收一个AbstractScheduledEventExecutor
对象作为第一个参数,指定了该任务执行的Executor,然后是一个Callable
对象,这个Callable对象是将我们提交的Runnable
对象通过Executors.<Void>callable(command, null)
转换而得,这个command代表我们需要执行的任务,而null值代表结果(由于Runnable执行无法直接获取结果,转换成Callable后用null代替结果)。第三个参数是该周期任务下一次执行(也有可能是第一次)的开始时间,在调用处使用ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay))
静态方法计算而得,第四个参数是周期任务的周期时间,注意时间的单位都会被转换成纳秒,并且用nanoTime方式表示。
我们可以看到ScheduledFutureTask
的作用,就是将一个调度任务执行的Executor,执行时间,周期信息保存在该对象中。
同样的,对于定时任务创建ScheduledFutureTask
第一个参数还是一样,表示该定时任务执行的Executor,Callable
对象通过toCallable(runnable, result)
转换而得,第三个参数是定时任务执行的实际nanoTime时间,周期时间当然是0,这些信息同样也被保存在ScheduledFutureTask
对象中。
现在我们了解了ScheduledFutureTask
的作用,接下来分析执行过程。
执行调度任务
回到NioEventLoop
类的循环run()
方法,注意到任务队列处理的地方,会根据ioRatio
进行判断,如果ioRatio
是100,那么就执行runAllTasks()
无参方法;否则就执行runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
方法。
我们先看runAllTasks()
无参方法,选取关键部分代码
|
|
在分析上面代码之前,我们先注意fetchFromScheduledTaskQueue()
方法
|
|
fetchFromScheduledTaskQueue()
方法就是将调度任务队列的任务抽取出来,然后加入到普通任务队列中,如果普通任务队列满了,调度任务就退回原来的调度任务队列,接着继续循环一直到调度任务队列取不出来为止。
这里要特别关注抽取方法pollScheduledTask()
方法,因为这个方法就是用来确定我们的调度任务是否满足时间触发条件的,在这个方法中会将要取出的调度任务的下一次运行时间(定时运行时间) 与当前时间进行比较,如果满足了时间条件,那么才会把这个调度任务从队列中取出。而我们的调度任务中已经保存了周期任务的下一次执行时间(或者定时任务的定时时间)的信息。
|
|
然后回到之前runAllTasks()
方法,现在知道了fetchedAll
表示调度任务队列中可以取出任务是否全部被取出,而上面代码的目的就是将调度任务队列所有能取出的任务放入普通任务队列中,然后runAllTasksFrom(taskQueue)
执行所有任务。
还有一点要想起的是,我们的周期任务是怎么周期运行的呢?还要回到ScheduledFutureTask
类,找到run()
方法,可以见到如果周期时间periodNanos
被设置为0,那么这个任务就是个定时任务,那么仅仅执行task.call()
一次即可;而如果周期时间不为0,那么任务就是周期任务了,这个时候除了执行task.call()
,还会修改deadlineNanos
,把下一次执行时间累加上这个周期任务的周期时间,接着把修改好的任务再插入到调度任务队列中去,这样我们的周期任务也能周期执行了,可以对照下面的代码。
|
|
再来分析ioRatio
不为100的情况,即调用runAllTasks(long timeoutNanos)
方法。基本的逻辑都一样,首先将满足条件的调度任务从调度任务队列取出到普通任务队列,接着执行普通任务。
不过这里多了一步检查的逻辑,它会对每个执行过的任务进行计数,每当执行的任务计数到达64时(用位与来实现),就会将执行这些任务的总时间与分配的时间进行对比,如果超出分配时间了,那么这次循环就不再执行任务队列中的任务,这个分配的时间是由io事件执行时间(processSelectedKeys()
这个方法执行时间)与ioRatio计算所得,代码ioTime * (100 - ioRatio) / ioRatio
比较直观。需要提醒的是,ioRatio
被设置为100的时候,意味着我们取消了超时检测的机制,这时EventLoop会处理所有当前任务队列中的任务。
小结
在本文中一开始介绍了Vert.x的CloseHooks
的机制,接着对Vert.x周期任务,定时任务的源码进行了探索,还对Netty中事件循环的任务调度处理机制进行了仔细的分析。现在我们应该明白了Vert.x中周期任务与定时任务的原理。